Skip to content

Conversation

@dandavison
Copy link
Contributor

@dandavison dandavison commented Sep 12, 2025

Fixes #796

Problem statement

  • We must make it possible for users to create custom data converters, the methods of which have access to relevant context. In other words, when implementing any of the above interfaces, the user must be able to implement methods that have access to a context object.
  • Existing custom data converters must continue to work. So we cannot add parameters to the interface signatures and change call sites to pass a new argument.

Solution

  1. At every point where the data converter is used, we construct a context-aware data converter by calling data_converter._with_context(context).
    • Call sites:
      • At many locations in client.py when we know a workflow ID, and in the with_context method of AsyncActivityHandle to allow custom context
      • In the workflow worker when handling an activation (codec + failure conversion)
        • At various points in workflow code inside the sandbox (payload/failure conversion)
          • using workflow context in most places but also activity / child workflow / external workflow context when working with those APIs.
      • In the activity worker when starting an activity and heartbeating
      • When constructing the data converter instance exposed to users in workflow and activity code TODO: test this
  2. Users take advantage of this by implementing with_context methods on any of the following components:
    • payload converter
    • failure converter
    • payload codec
  3. In addition, users may implement with_context on an EncodingPayloadConverter. These are used by the SDK’s default payload converter.
  4. In addition, we provide a with_context method on AsyncActivityHandle that they can call their own activity context.

Details

Python Serialization Context

Background

Data converter

Recall that a DataConverter is a dataclass with three fields:

  1. payload_converter_class: a nullary factory function returning a PayloadConverter
  2. failure_converter_class: a nullary factory function returning a FailureConverter
  3. An optional PayloadCodec

For the payload converter and failure converter, the name and type annotation expect that the user supplies a subclass of PayloadConverter / FailureConverter, as opposed to a generic callable.

Example usage

At various points in client and worker code, these 3 instances are used to transform outbound and inbound data. For example, here is client code converting outbound workflow input args to Payload:

            req.input.payloads.extend(
                await self._client.data_converter.encode(input.args)
            )

It calls DataConverter.encode() , which does both serialization and encoding:

        payloads = self.payload_converter.to_payloads(values)
        if self.payload_codec:
            payloads = await self.payload_codec.encode(payloads)

Here’s an example of worker code using a failure converter:

class _WorkflowWorker:
    async def _handle_activation(
        self, act: temporalio.bridge.proto.workflow_activation.WorkflowActivation
    ) -> None:
        ...
        except Exception as err:
                self._data_converter.failure_converter.to_failure(
                    err,
                    self._data_converter.payload_converter,
                    completion.failed.failure,
                )

Interfaces

Users can replace any of the three fields on the default data converter.

Replacing the payload codec or failure converter is straightforward.

Payload codec

For payload codec the user must implement the following interface and set an encoding property:

async def encode(self, payloads: Iterable[Payload]) -> List[Payload]
async def decode(self, payloads: Iterable[Payload]) -> List[Payload]

Failure converter

Failure is the name of the proto struct we use for encoding failures across languages. For failure converter the user must implement:

    @abstractmethod
    def to_failure(
        self,
        exception: BaseException,
        payload_converter: PayloadConverter,
        failure: temporalio.api.failure.v1.Failure,
    ) -> None:

    @abstractmethod
    def from_failure(
        self,
        failure: temporalio.api.failure.v1.Failure,
        payload_converter: PayloadConverter,
    ) -> BaseException:

Payload converter

The PayloadConverter interface is:

    def to_payloads(
        self, values: Sequence[Any]
    ) -> List[temporalio.api.common.v1.Payload]:

    def from_payloads(
        self,
        payloads: Sequence[temporalio.api.common.v1.Payload],
        type_hints: Optional[List[Type]] = None,
    ) -> List[Any]:

By default, the payload converter is a CompositePayloadConverter which contains multiple EncodingPayloadConverters, which are tried in order until one succeeds. To replace the payload converter, a user normally creates a subclass of CompositePayloadConverter that prepends a custom EncodingPayloadConverters onto the existing collection.

An EncodingPayloadConverter is an interface with a string encoding field and

    @abstractmethod
    def to_payload(self, value: Any) -> Optional[temporalio.api.common.v1.Payload]:

    @abstractmethod
    def from_payload(
        self,
        payload: temporalio.api.common.v1.Payload,
        type_hint: Optional[Type] = None,
    ) -> Any:

Problem statement

  • We must make it possible for users to create custom data converters, the methods of which have access to relevant context. In other words, when implementing any of the above interfaces, the user must be able to implement methods that have access to a context object.
  • Existing custom data converters must continue to work. So we cannot add parameters to the interface signatures and change call sites to pass a new argument.

Solution

  1. At every point where the data converter is used, we construct a context-aware data converter by calling data_converter._with_context(context).
    • Call sites:
      • At many locations in client.py when we know a workflow ID, and in the with_context method of AsyncActivityHandle to allow custom context
      • In the workflow worker when handling an activation (codec + failure conversion)
        • At various points in workflow code inside the sandbox (payload/failure conversion)
          • using workflow context in most places but also activity / child workflow / external workflow context when working with those APIs.
      • In the activity worker when starting an activity and heartbeating
      • When constructing the data converter instance exposed to users in workflow and activity code TODO: test this
  2. Users take advantage of this by implementing with_context methods on any of the following components:
    • payload converter
    • failure converter
    • payload codec
  3. In addition, users may implement with_context on an EncodingPayloadConverter. These are used by the SDK’s default payload converter.
  4. In addition, we provide a with_context method on AsyncActivityHandle that they can call their own activity context.

with_context on a payload converter

Suppose a user implements

class MyPayloadConverter(PayloadConverter, WithSerializationContext):
    def to_payloads(
        self, values: Sequence[Any]
    ) -> List[temporalio.api.common.v1.Payload]:
        # custom behavior

    def from_payloads(
        self,
        payloads: Sequence[temporalio.api.common.v1.Payload],
        type_hints: Optional[List[Type]] = None,
    ) -> List[Any]:
        # custom behavior

    def with_context(self, context: SerializationContext) -> MyPayloadConverter:
        # ...
        return new_instance

We will now use their with_context() when constructing the payload converter that we actually use.

with_context on an EncodingPayloadConverter

However, to customize a payload converter, a user normally creates a subclass of CompositePayloadConverter that prepends a custom EncodingPayloadConverter onto the existing collection. We will make it possible for users to implement context-aware to_payload and from_payload methods on their EncodingPayloadConverter.

For this, the SDK must implement with_context on CompositePayloadConverter such that:

  • It uses with_context on any of the user’s EncodingPayloadConverters
  • It returns an instance of the user’s class in case they have subclassed CompositePayloadConverter

Note

Introduce serialization context (workflow/activity) and propagate it across payload/failure conversion and codecs throughout client and workers; add AsyncActivityHandle.with_context and extensive tests.

  • Converter/Core:
    • Add SerializationContext, WorkflowSerializationContext, ActivitySerializationContext, and WithSerializationContext.
    • Implement DataConverter.with_context, CompositePayloadConverter.with_context, and context propagation to payload/failure converters and codecs.
  • Worker:
    • Introduce CommandAwarePayloadVisitor and contextvar-based command tracking.
    • Use context-aware codec/failure conversion in workflow worker; add _CommandAwarePayloadCodec.
    • Set activity serialization context for execution and heartbeats.
  • Client:
    • Use context-aware data converter for start/signal/query/update/terminate, schedule creation, listing/describe, and workflow result/failure decoding.
    • Add WorkflowExecution.data_converter (contextual) and refactor related constructors.
    • Add AsyncActivityHandle.with_context and support per-call data converter overrides for async activity APIs.
  • APIs/Docs:
    • workflow.payload_converter() and activity.payload_converter() now return converters with context set.
  • Tests:
    • Add comprehensive tests for serialization context, command-aware visitor coverage, codec behavior, and pydantic integration.
  • CI:
    • Enable Rust clippy in latest-deps job.

Written by Cursor Bugbot for commit c78016e. This will update automatically on new commits. Configure here.

@dandavison dandavison force-pushed the dan-9986-serialization-context branch from 56db57e to 3e53f35 Compare September 12, 2025 22:22
@dandavison dandavison changed the title Dan 9986 serialization context Serialization context Sep 12, 2025
@dandavison dandavison force-pushed the dan-9986-serialization-context branch 11 times, most recently from 8e0e193 to 4a0b661 Compare September 15, 2025 09:15
@dandavison dandavison marked this pull request as ready for review September 15, 2025 09:17
@dandavison dandavison requested a review from a team as a code owner September 15, 2025 09:17
@dandavison dandavison force-pushed the dan-9986-serialization-context branch from 4a0b661 to 6000b0d Compare September 15, 2025 10:44
timeout=input.rpc_timeout,
)

def _async_activity_data_converter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one seems odd. Is this when the client comes back to complete an activity out of band, they don't give us a lot of the information?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this was not how it should be. Changed now (following C#/Java, as suggested in #1102 (comment)) so that the async activity handle itself implements WithSerialiationContext, allowing users to supply context fields matching those that will be used on receipt of the payload by the activity worker.

workflow = _RunningWorkflow(
self._create_workflow_instance(act, init_job)
)
workflow_instance, det = self._create_workflow_instance(act, init_job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems potentially problematic. The act given to create workflow instance is now given prior to decoding. Maybe not a problem, but is there a reason to change it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't currently need to be decoded at this stage but I think you're right that this can be done less intrusively: we can get the workflow ID from init_job.workflow_id. I'll make that change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does technically need to be decoded at this stage because info needs decoded memo and headers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section has been rewritten and the concerns here should be resolved now.

WorkflowExecution._from_raw_info(v, self._client.data_converter)
WorkflowExecution._from_raw_info(
v,
self._client.data_converter._with_context(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to create a new context-specific converter for each page here? No strong opinion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it feels better not to since the common case is all the same workflow. I've memoized it.

) -> None:
"""Create workflow handle."""
self._client = client
self._data_converter = client.data_converter._with_context(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is often created by people that don't care about it (e.g. they start a workflow and don't care about the handle). Are there concerns about creating a context-specific data converter in all cases even if it's never used? I wonder if we should build the converter each call when they make the call, same as top-level client calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made it a lazily-computed property.

I don't think we don't need to construct it on every call since users should not expect to be able to rely on dynamic behavior like that, and I don't think we should inline repetitive code. It does no I/O, so it's not obvious that we should optimize performance here, but I actually do think it's reasonable seeing as the constructor did not call any functions previously and as you say it may very well not be used.

return self._client.data_converter._with_context(
ActivitySerializationContext(
namespace=self._client.namespace,
workflow_id=(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What we did here in Java and .NET is had this async activity client/handle implement WithSerializationContext since we don't always have the workflow ID. This way, users who know some of this information can do a "with context" to get context-specific async activity client, and they can choose which fields they are ok being empty and such. The task token approach is by far the most common approach (though in general async activity completion is not that common), so I think we may need to just put this in front of users to let them set the context.

Copy link
Contributor Author

@dandavison dandavison Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've done that, following .NET.

Comment on lines 126 to 127
namespace: str
workflow_id: str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In .NET and Java we had a common interface for both workflow and activity serialization context to show they both had these two fields. No problem not doing here, just noting it if you wanted to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. I agree, since it's relatively important for users of this feature to understand that the workflow ID and namespace are available in both, a shared class makes sense. Done.

during serialization and deserialization.
"""

def with_context(self, context: Optional[SerializationContext]) -> Self:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least in other SDKs, I am not sure there is ever expected to be a situation where this is called with None. With context always assumes it will be with a context and developers don't have to code around the absence of one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed Optional here. In a previous version of the PR I was passing None to Nexus contexts but not any longer.

Comment on lines 255 to 265
data_converter = self._data_converter
if activity.info:
context = temporalio.converter.ActivitySerializationContext(
namespace=activity.info.workflow_namespace,
workflow_id=activity.info.workflow_id,
workflow_type=activity.info.workflow_type,
activity_type=activity.info.activity_type,
activity_task_queue=self._task_queue,
is_local=activity.info.is_local,
)
data_converter = data_converter._with_context(context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to get this off the running activity instead of recreating every heartbeat? I know we store the payload converter on the activity context which is being accessed here, maybe we should store the data converter instead and just return its payload converter from activity.payload_converter()? I am unsure if this affects how multiprocessing and pickling work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, but it's not an immediate performance concern, and a non-trivial refactor, so I think we should leave it for a follow-on PR. Here's an untested branch: dan-9986-serialization-context...dan-9986-serialization-context-activity-context-dataconverter

workflow = _RunningWorkflow(
self._create_workflow_instance(act, init_job)
)
workflow_instance, det = self._create_workflow_instance(act, init_job)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned with the refactoring that act (and its init_job) has not run through the codec by this point where it had before. I think the logic needs to stay using the codec before workflow instance creation code, but you need to extract the workflow ID from the init_job or the running workflow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, we're doing that now.

act: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
init: temporalio.bridge.proto.workflow_activation.InitializeWorkflow,
) -> WorkflowInstance:
) -> tuple[WorkflowInstance, WorkflowInstanceDetails]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to change the entire return type here just to get workflow ID. Caller just extract it out of the init job and we don't have to mutate this code at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, the changes to this function have been reverted in connection to discussion above.

Comment on lines 213 to 214
self._payload_converter_class = det.payload_converter_class
self._failure_converter_class = det.failure_converter_class
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we need to store these. The "with context" can be called on the already-created converters, we should not re-instantiate converters more than once per instance IMO

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and resolved in recent commits

Comment on lines 2066 to 2067
payload_converter = self._payload_converter_class()
failure_converter = self._failure_converter_class()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned above, but I don't believe we need to reinstantiate converters multiple times in this instance, just call the "with context" on the already existing ones.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline and resolved in recent commits

@dandavison dandavison force-pushed the dan-9986-serialization-context branch 2 times, most recently from adf8bc3 to d3311e6 Compare September 19, 2025 12:17
cursor[bot]

This comment was marked as outdated.

cursor[bot]

This comment was marked as outdated.

@dandavison dandavison marked this pull request as draft September 22, 2025 17:48
@dandavison dandavison force-pushed the dan-9986-serialization-context branch 5 times, most recently from 3f41215 to ba1bfbb Compare September 24, 2025 07:59
@dandavison dandavison force-pushed the dan-9986-serialization-context branch from 8d7bba3 to d8c4f17 Compare September 30, 2025 17:35
Copy link
Member

@cretz cretz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing blocking. I do think we added plenty of cycles to user code that don't need it, but they are not enough to block the PR

command.set_patch_marker.deprecated = deprecated
return use_patch

def workflow_payload_converter(self) -> temporalio.converter.PayloadConverter:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be worth documenting in activity.payload_converter() and workflow.payload_converter() doc strings that these are context-specific payload converters, but not that important

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.

if isinstance(payload_converter, temporalio.converter.WithSerializationContext):
payload_converter = payload_converter.with_context(context)
if isinstance(failure_converter, temporalio.converter.WithSerializationContext):
failure_converter = failure_converter.with_context(context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there are several cases where we are calling a user's with context on a failure converter even if we don't need it. It's not a big deal of course, just extra unneeded instantiations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've split the method to construct the payload and failure converter separately.

self,
command_info: Optional[_command_aware_visitor.CommandInfo],
) -> Optional[temporalio.converter.SerializationContext]:
workflow_context = temporalio.converter.WorkflowSerializationContext(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Many times in this method, this context isn't needed, why instantiate it every time as if it's always needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason to instantiate it in one location is to avoid repeating code. In general, that's a more important concern than avoiding an extra instantiation of a frozen dataclass holding two already-computed strings.

is_local: bool


# TODO: duck typing or nominal typing?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe remove todo

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, done.

new_instance = type(self)() # Must have a nullary constructor
new_instance._set_converters(*converters)
return new_instance

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Payload Converter Context Propagation Issue

In CompositePayloadConverter.with_context, creating a new instance with type(self)() can break subclasses that have custom constructors or additional state. Furthermore, the _any_converter_takes_context flag isn't updated after setting the new converters, which can lead to with_context incorrectly returning self or failing to propagate the serialization context to its component converters.

Fix in Cursor Fix in Web

@dandavison dandavison enabled auto-merge (squash) October 1, 2025 17:16
@dandavison dandavison merged commit a31886d into main Oct 1, 2025
16 checks passed
@dandavison dandavison deleted the dan-9986-serialization-context branch October 1, 2025 17:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request] Serialization context for codecs and converters

4 participants